/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.phoenix.pherf;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import com.jcabi.jdbc.JdbcSession;
import com.jcabi.jdbc.Outcome;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.pherf.PherfConstants.GeneratePhoenixStats;
import org.apache.phoenix.pherf.configuration.Column;
import org.apache.phoenix.pherf.configuration.DataModel;
import org.apache.phoenix.pherf.configuration.DataTypeMapping;
import org.apache.phoenix.pherf.configuration.Scenario;
import org.apache.phoenix.pherf.rules.DataValue;
import org.apache.phoenix.pherf.rules.RulesApplier;
import org.apache.phoenix.pherf.util.PhoenixUtil;
import org.apache.phoenix.pherf.workload.QueryExecutor;
import org.apache.phoenix.pherf.workload.Workload;
import org.apache.phoenix.pherf.workload.WorkloadExecutor;
import org.apache.phoenix.pherf.workload.WriteWorkload;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(NeedsOwnMiniClusterTest.class)
public class DataIngestIT extends ResultBaseTestIT {

  private Properties properties;

  @Before
  public void applySchema() throws Exception {
    reader.applySchema();
    resources = new ArrayList<>(reader.getResourceList());

    assertTrue("Could not pull list of schema files.", resources.size() > 0);
    assertNotNull("Could not read schema file.", reader.resourceToString(resources.get(0)));
    properties = PherfConstants.create().getProperties(PherfConstants.PHERF_PROPERTIES, true);
  }

  @Test
  public void testColumnRulesApplied() {

    Scenario scenario = null;
    try {
      scenario = parser.getScenarioByName("testScenario");
      List<Column> columnListFromPhoenix = util.getColumnsFromPhoenix(scenario.getSchemaName(),
        scenario.getTableNameWithoutSchemaName(), util.getConnection());
      assertTrue("Could not get phoenix columns.", columnListFromPhoenix.size() > 0);

      WriteWorkload loader =
        new WriteWorkload(util, parser, properties, scenario, GeneratePhoenixStats.NO);
      WorkloadExecutor executor = new WorkloadExecutor();
      executor.add(loader);
      executor.get();
      executor.shutdown();

      RulesApplier rulesApplier = loader.getRulesApplier();
      List<Map> modelList = rulesApplier.getModelList();
      assertTrue("Could not generate the modelList", modelList.size() > 0);

      for (Column column : columnListFromPhoenix) {
        DataValue data = rulesApplier.getDataForRule(scenario, column);

        // We are generating data values
        // so the value should have been specified by this point.
        assertTrue("Failed to retrieve data for column type: " + column.getType(), data != null);

        // Test that we still retrieve the GENERAL_CHAR rule even after an override is
        // applied to another CHAR type. NEWVAL_STRING Column does not specify an override
        // so we should get the default rule.
        if (
          (column.getType() == DataTypeMapping.VARCHAR)
            && (column.getName().equals("NEWVAL_STRING"))
        ) {
          assertTrue("Failed to retrieve data for column type: ",
            data.getDistribution() == Integer.MIN_VALUE);
        }
      }

      // Verify number of rows written
      assertExpectedNumberOfRecordsWritten(scenario);

      // Run some queries
      executor = new WorkloadExecutor();
      Workload query = new QueryExecutor(parser, util, executor);
      executor.add(query);
      executor.get();
      executor.shutdown();
      PhoenixUtil.create().deleteTables("ALL");
    } catch (Exception e) {
      fail("We had an exception: " + e.getMessage());
    }
  }

  @Test
  public void testPreAndPostDataLoadDdls() throws Exception {
    Scenario scenario = parser.getScenarioByName("testPreAndPostDdls");
    WorkloadExecutor executor = new WorkloadExecutor();
    executor.add(new WriteWorkload(util, parser, properties, scenario, GeneratePhoenixStats.NO));

    try {
      executor.get();
      executor.shutdown();
    } catch (Exception e) {
      fail("Failed to load data. An exception was thrown: " + e.getMessage());
    }

    assertExpectedNumberOfRecordsWritten(scenario);
  }

  @Test
  public void testRWWorkload() throws Exception {

    Connection connection = util.getConnection();

    WorkloadExecutor executor = new WorkloadExecutor();
    DataModel dataModel = parser.getDataModelByName("test_scenario");
    List<DataModel> dataModels = new ArrayList<>();
    dataModels.add(dataModel);
    QueryExecutor qe = new QueryExecutor(parser, util, executor, dataModels, null, false);
    executor.add(qe);
    Scenario scenario = parser.getScenarioByName("testScenarioRW");

    String sql = "select count(*) from " + scenario.getTableName();

    try {
      // Wait for data to load up.
      executor.get();
      executor.shutdown();

      // Verify data has been loaded
      Integer count = new JdbcSession(connection).sql(sql).select(new Outcome<Integer>() {
        @Override
        public Integer handle(ResultSet resultSet, Statement statement) throws SQLException {
          while (resultSet.next()) {
            return resultSet.getInt(1);
          }
          return null;
        }
      });
      assertNotNull("Could not retrieve count. " + count);

      // It would be better to sum up all the rowcounts for the scenarios, but this is fine
      assertTrue("Could not query any rows for in " + scenario.getTableName(), count > 0);
    } catch (Exception e) {
      fail("Failed to load data. An exception was thrown: " + e.getMessage());
    }
  }

  @Test
  /**
   * Validates that Pherf can write data to a Multi-Tenant View in addition to standard Phoenix
   * tables.
   */
  public void testMultiTenantViewWriteWorkload() throws Exception {
    // Arrange
    Scenario scenario = parser.getScenarioByName("testMTWriteScenario");
    WorkloadExecutor executor = new WorkloadExecutor();
    executor.add(new WriteWorkload(util, parser, properties, scenario, GeneratePhoenixStats.NO));

    // Act
    try {
      // Wait for data to load up.
      executor.get();
      executor.shutdown();
    } catch (Exception e) {
      fail("Failed to load data. An exception was thrown: " + e.getMessage());
    }

    assertExpectedNumberOfRecordsWritten(scenario);
  }

  @Test
  public void testMultiTenantScenarioRunBeforeWriteWorkload() throws Exception {
    // Arrange
    Scenario scenario = parser.getScenarioByName("testMTDdlWriteScenario");
    WorkloadExecutor executor = new WorkloadExecutor();
    executor.add(new WriteWorkload(util, parser, properties, scenario, GeneratePhoenixStats.NO));

    // Act
    try {
      // Wait for data to load up.
      executor.get();
      executor.shutdown();
    } catch (Exception e) {
      fail("Failed to load data. An exception was thrown: " + e.getMessage());
    }

    assertExpectedNumberOfRecordsWritten(scenario);
  }

  private void assertExpectedNumberOfRecordsWritten(Scenario scenario) throws Exception {
    Connection connection = util.getConnection(scenario.getTenantId());
    String sql = "select count(*) from " + scenario.getTableName();
    Integer count = new JdbcSession(connection).sql(sql).select(new Outcome<Integer>() {
      @Override
      public Integer handle(ResultSet resultSet, Statement statement) throws SQLException {
        while (resultSet.next()) {
          return resultSet.getInt(1);
        }
        return null;
      }
    });
    assertNotNull("Could not retrieve count. ", count);
    assertEquals("Expected 100 rows to have been inserted", scenario.getRowCount(),
      count.intValue());
  }

}
